package com.github.bluesbruce.mqtt.web;

import java.nio.charset.StandardCharsets;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * MQTT发送类
 *
 * @author BBF
 */
public class MqttClientSender {

  private static final Logger LOGGER = LoggerFactory.getLogger(MqttClientSender.class);
  private static final String MQTT_HOST = "tcp://127.0.0.1:61613";
  private static final String MQTT_CLIENT_ID = "mqttConsumer_tester";
  private static final String MQTT_USERNAME = "admin";
  private static final String MQTT_PASSWORD = "password";

  /**
   * 发布
   *
   * @param topic   主题
   * @param message 消息
   * @param qos     服务质量 [0, 1, 2]
   */
  public static void publish(String topic, String message, int qos) {
    MqttClient client = Singleton.INSTANCE.instance;
    if (client == null || !client.isConnected()) {
      LOGGER.warn("MqttClient不可用");
      return;
    }
    MqttTopic mqttTopic = client.getTopic(topic);
    if (mqttTopic == null) {
      LOGGER.warn("发布通道不可用");
      return;
    }
    MqttMessage data = new MqttMessage();
    data.setQos(qos);
    data.setPayload(message.getBytes(StandardCharsets.UTF_8));
    try {
      MqttDeliveryToken token = mqttTopic.publish(data);
      if (!token.isComplete()) {
        LOGGER.info("消息发送成功");
      }
    } catch (Exception e) {
      LOGGER.error("消息发送失败：{}", e.getMessage());
    }
  }

  /**
   * 获取单例，枚举方式
   */
  private enum Singleton {
    /**
     * 枚举单例
     */
    INSTANCE;
    private final MqttClient instance;

    Singleton() {
      MqttClient client = null;
      try {
        client = new MqttClient(MQTT_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
        MqttConnectOptions option = new MqttConnectOptions();
        option.setCleanSession(true);
        option.setUserName(MQTT_USERNAME);
        option.setPassword(MQTT_PASSWORD.toCharArray());
        option.setConnectionTimeout(30);
        option.setKeepAliveInterval(20);
        option.setAutomaticReconnect(true);
//        client.setCallback(new MqttPushCallback());
        client.connect(option);
      } catch (Exception e) {
        e.printStackTrace();
      }
      instance = client;
    }
  }
}
